/*
 * Copyright (C) 2024-present Puter Technologies Inc.
 *
 * This file is part of Puter.
 *
 * Puter is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published
 * by the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

const STUCK_STATUS_TIMEOUT = 10 * 1000;
const STUCK_ALARM_TIMEOUT = 20 * 1000;

import path_ from 'node:path';
import { v4 as uuidv4 } from 'uuid';

const { db } = extension.import('data');

const svc_metering = extension.import('service:meteringService');
const svc_trace = extension.import('service:traceService');
const svc_fs = extension.import('service:filesystem');
const { stuck_detector_stream, hashing_stream } = extension.import('core').util.streamutil;

// TODO: filesystem providers should not need to call EventService
const svc_event = extension.import('service:event');

// TODO: filesystem providers REALLY SHOULD NOT implement ACL logic!
const svc_acl = extension.import('service:acl');

// TODO: these services ought to be part of this extension
const svc_size = extension.import('service:sizeService');
const svc_resource = extension.import('service:resourceService');

// Not sure where these really belong yet
const svc_fileCache = extension.import('service:file-cache');

// TODO: depending on mountpoint service will not be necessary
//       once the storage provider is moved to this extension
const svc_mountpoint = extension.import('service:mountpoint');

const {
    APIError,
    Actor,
    Context,
    UserActorType,
    TDetachable,
    MultiDetachable,
} = extension.import('core');

const {
    get_user,
} = extension.import('core').util.helpers;

const {
    ParallelTasks,
} = extension.import('core').util.otelutil;

const {
    TYPE_DIRECTORY,
} = extension.import('core').fs;

const {
    NodeChildSelector,
    NodeUIDSelector,
    NodeInternalIDSelector,
} = extension.import('core').fs.selectors;

const {
    FSNodeContext,
    capabilities,
} = extension.import('fs');

const {
    // MODE_READ,
    MODE_WRITE,
} = extension.import('fs').lock;

// ^ Yep I know, import('fs') and import('core').fs is confusing and
// redundant... this will be cleaned up as the new API is developed

const {
    // MODE_READ,
    RESOURCE_STATUS_PENDING_CREATE,
} = extension.import('fs').resource;

const {
    UploadProgressTracker,
} = extension.import('fs').util;

export default class PuterFSProvider {
    constructor ({ fsEntryController, storageController }) {
        this.fsEntryController = fsEntryController;
        this.storageController = storageController;
        this.name = 'puterfs';
    }

    // TODO: should this be a static member instead?
    get_capabilities () {
        return new Set([
            capabilities.THUMBNAIL,
            capabilities.UPDATE_THUMBNAIL,
            capabilities.UUID,
            capabilities.OPERATION_TRACE,
            capabilities.READDIR_UUID_MODE,
            capabilities.PUTER_SHORTCUT,

            capabilities.COPY_TREE,
            capabilities.GET_RECURSIVE_SIZE,

            capabilities.READ,
            capabilities.WRITE,
            capabilities.CASE_SENSITIVE,
            capabilities.SYMLINK,
            capabilities.TRASH,
        ]);
    }

    // #region PuterOnly
    async update_thumbnail ({ context, node, thumbnail }) {
        const {
            actor: inputActor,
        } = context.values;
        const actor = inputActor ?? Context.get('actor');

        context = context ?? Context.get();
        const services = context.get('services');

        // TODO: this ACL check should not be here, but there's no LL method yet
        //       and it's possible we will never implement the thumbnail
        //       capability for any other filesystem type

        const svc_acl = services.get('acl');
        if ( ! await svc_acl.check(actor, node, 'write') ) {
            throw await svc_acl.get_safe_acl_error(actor, node, 'write');
        }

        const uid = await node.get('uid');

        const entryOp = await this.fsEntryController.update(uid, {
            thumbnail,
        });

        (async () => {
            await entryOp.awaitDone();
            svc_event.emit('fs.write.file', {
                node,
                context,
            });
        })();

        return node;
    }

    async puter_shortcut ({ parent, name, user, target }) {
        await target.fetchEntry({ thumbnail: true });

        const ts = Math.round(Date.now() / 1000);
        const uid = uuidv4();

        svc_resource.register({
            uid,
            status: RESOURCE_STATUS_PENDING_CREATE,
        });

        const raw_fsentry = {
            is_shortcut: 1,
            shortcut_to: target.mysql_id,
            is_dir: target.entry.is_dir,
            thumbnail: target.entry.thumbnail,
            uuid: uid,
            parent_uid: await parent.get('uid'),
            path: path_.join(await parent.get('path'), name),
            user_id: user.id,
            name,
            created: ts,
            updated: ts,
            modified: ts,
            immutable: false,
        };

        const entryOp = await this.fsEntryController.insert(raw_fsentry);

        (async () => {
            await entryOp.awaitDone();
            svc_resource.free(uid);
        })();

        const node = await svc_fs.node(new NodeUIDSelector(uid));

        svc_event.emit('fs.create.shortcut', {
            node,
            context: Context.get(),
        });

        return node;
    }
    // #endregion

    // #region Standard FS

    /**
     * Check if a given node exists.
     *
     * @param {Object} param
     * @param {NodeSelector} param.selector - The selector used for checking.
     * @returns {Promise<boolean>} - True if the node exists, false otherwise.
     */
    async quick_check ({
        selector,
    }) {
        // shortcut: has full path
        if ( selector?.path ) {
            const entry = await this.fsEntryController.findByPath(selector.path);
            return Boolean(entry);
        }

        // shortcut: has uid
        if ( selector?.uid ) {
            const entry = await this.fsEntryController.findByUID(selector.uid);
            return Boolean(entry);
        }

        // shortcut: parent uid + child name
        if ( selector instanceof NodeChildSelector && selector.parent instanceof NodeUIDSelector ) {
            return await this.fsEntryController.nameExistsUnderParent(selector.parent.uid,
                            selector.name);
        }

        // shortcut: parent id + child name
        if ( selector instanceof NodeChildSelector && selector.parent instanceof NodeInternalIDSelector ) {
            return await this.fsEntryController.nameExistsUnderParentID(selector.parent.id,
                            selector.name);
        }

        return false;
    }

    async unlink ({ context, node, options = {} }) {
        if ( await node.get('type') === TYPE_DIRECTORY ) {
            throw new APIError(409, 'Cannot unlink a directory.');
        }

        await this.#rmnode({ context, node, options });
    }

    async rmdir ({ context, node, options = {} }) {
        if ( await node.get('type') !== TYPE_DIRECTORY ) {
            throw new APIError(409, 'Cannot rmdir a file.');
        }

        if ( await node.get('immutable') ) {
            throw APIError.create('immutable');
        }

        const children = await this.fsEntryController.fast_get_direct_descendants(await node.get('uid'));

        if ( children.length > 0 && !options.ignore_not_empty ) {
            throw APIError.create('not_empty');
        }

        await this.#rmnode({ context, node, options });
    }

    /**
     * Create a new directory.
     *
     * @param {Object} param
     * @param {Context} param.context
     * @param {FSNode} param.parent
     * @param {string} param.name
     * @param {boolean} param.immutable
     * @returns {Promise<FSNode>}
     */
    async mkdir ({ context, parent, name, immutable }) {
        const { actor, thumbnail } = context.values;

        const ts = Math.round(Date.now() / 1000);
        const uid = uuidv4();

        const existing = await svc_fs.node(new NodeChildSelector(parent.selector, name));

        if ( await existing.exists() ) {
            throw APIError.create('item_with_same_name_exists', null, {
                entry_name: name,
            });
        }

        if ( ! await parent.exists() ) {
            throw APIError.create('subject_does_not_exist');
        }

        svc_resource.register({
            uid,
            status: RESOURCE_STATUS_PENDING_CREATE,
        });

        const raw_fsentry = {
            is_dir: 1,
            uuid: uid,
            parent_uid: await parent.get('uid'),
            path: path_.join(await parent.get('path'), name),
            user_id: actor.type.user.id,
            name,
            created: ts,
            accessed: ts,
            modified: ts,
            immutable: immutable ?? false,
            ...(thumbnail ? {
                thumbnail: thumbnail,
            } : {}),
        };

        console.log('raw fsentry', raw_fsentry);
        const entryOp = await this.fsEntryController.insert(raw_fsentry);

        await entryOp.awaitDone();
        svc_resource.free(uid);

        const node = await svc_fs.node(new NodeUIDSelector(uid));

        svc_event.emit('fs.create.directory', {
            node,
            context: Context.get(),
        });

        return node;
    }

    async read ({ context, node, version_id, range }) {
        const svc_mountpoint = context.get('services').get('mountpoint');
        const storage = svc_mountpoint.get_storage(this.constructor.name);
        const location = await node.get('s3:location') ?? {};
        const stream = (await storage.create_read_stream(await node.get('uid'), {
            // TODO: fs:decouple-s3
            bucket: location.bucket,
            bucket_region: location.bucket_region,
            version_id,
            key: location.key,
            memory_file: node.entry,
            ...(range ? { range } : {}),
        }));
        return stream;
    }

    async stat ({
        selector,
        options,
        controls,
        node,
    }) {
        // For Puter FS nodes, we assume we will obtain all properties from
        // fsEntryController, except for 'thumbnail' unless it's
        // explicitly requested.

        if ( options.tracer == null ) {
            options.tracer = svc_trace.tracer;
        }

        if ( options.op ) {
            options.trace_options = {
                parent: options.op.span,
            };
        }

        let entry;

        await new Promise (rslv => {
            const detachables = new MultiDetachable();

            const callback = (_resolver) => {
                detachables.as(TDetachable).detach();
                rslv();
            };

            // either the resource is free
            {
                // no detachale because waitForResource returns a
                // Promise that will be resolved when the resource
                // is free no matter what, and then it will be
                // garbage collected.
                svc_resource.waitForResource(selector).then(callback.bind(null, 'resourceService'));
            }

            // or pending information about the resource
            // becomes available
            {
                // detachable is needed here because waitForEntry keeps
                // a map of listeners in memory, and this event may
                // never occur. If this never occurs, waitForResource
                // is guaranteed to resolve eventually, and then this
                // detachable will be detached by `callback` so the
                // listener can be garbage collected.
                const det = this.fsEntryController.waitForEntry(node, callback.bind(null, 'fsEntryService'));
                if ( det ) detachables.add(det);
            }
        });

        const maybe_uid = node.uid;
        if ( svc_resource.getResourceInfo(maybe_uid) ) {
            entry = await this.fsEntryController.get(maybe_uid, options);
            controls.log.debug('got an entry from the future');
        } else {
            entry = await this.fsEntryController.find(selector, options);
        }

        if ( ! entry ) {
            if ( this.log_fsentriesNotFound ) {
                controls.log.warn(`entry not found: ${selector.describe(true)}`);
            }
        }

        if ( entry === null || typeof entry !== 'object' ) {
            return null;
        }

        if ( entry.id ) {
            controls.provide_selector(new NodeInternalIDSelector('mysql', entry.id, {
                source: 'FSNodeContext optimization',
            }));
        }

        return entry;
    }

    async copy_tree ({ context, source, parent, target_name }) {
        // Context
        const actor = (context ?? Context).get('actor');
        const user = actor.type.user;

        const tracer = svc_trace.tracer;
        const uuid = uuidv4();
        const timestamp = Math.round(Date.now() / 1000);
        await parent.fetchEntry();
        await source.fetchEntry({ thumbnail: true });

        // New filesystem entry
        const raw_fsentry = {
            uuid,
            is_dir: source.entry.is_dir,
            ...(source.entry.is_shortcut ? {
                is_shortcut: source.entry.is_shortcut,
                shortcut_to: source.entry.shortcut_to,
            } : {}),
            parent_uid: parent.uid,
            name: target_name,
            created: timestamp,
            modified: timestamp,

            path: path_.join(await parent.get('path'), target_name),

            // if property exists but the value is undefined,
            // it will still be included in the INSERT, causing
            // an error
            ...(source.entry.thumbnail ?
                { thumbnail: source.entry.thumbnail } : {}),

            user_id: user.id,
        };

        svc_event.emit('fs.pending.file', {
            fsentry: FSNodeContext.sanitize_pending_entry_info(raw_fsentry),
            context: context,
        });

        if ( await source.get('has-s3') ) {
            Object.assign(raw_fsentry, {
                size: source.entry.size,
                associated_app_id: source.entry.associated_app_id,
                bucket: source.entry.bucket,
                bucket_region: source.entry.bucket_region,
            });

            await tracer.startActiveSpan('fs:cp:storage-copy', async span => {
                let progress_tracker = new UploadProgressTracker();

                svc_event.emit('fs.storage.progress.copy', {
                    upload_tracker: progress_tracker,
                    context,
                    meta: {
                        item_uid: uuid,
                        item_path: raw_fsentry.path,
                    },
                });

                // const storage = new PuterS3StorageStrategy({ services: svc });
                const storage = context.get('storage');
                const state_copy = storage.create_copy();
                await state_copy.run({
                    src_node: source,
                    dst_storage: {
                        key: uuid,
                        bucket: raw_fsentry.bucket,
                        bucket_region: raw_fsentry.bucket_region,
                    },
                    storage_api: { progress_tracker },
                });

                span.end();
            });
        }

        {
            await svc_size.add_node_size(undefined, source, user);
        }

        svc_resource.register({
            uid: uuid,
            status: RESOURCE_STATUS_PENDING_CREATE,
        });

        const entryOp = await this.fsEntryController.insert(raw_fsentry);

        let node;

        const tasks = new ParallelTasks({ tracer, max: 4 });
        await context.arun('fs:cp:parallel-portion', async () => {
            // Add child copy tasks if this is a directory
            if ( source.entry.is_dir ) {
                const children = await this.fsEntryController.fast_get_direct_descendants(source.uid);
                for ( const child_uuid of children ) {
                    tasks.add('fs:cp:copy-child', async () => {
                        const child_node = await svc_fs.node(new NodeUIDSelector(child_uuid));
                        const child_name = await child_node.get('name');

                        await this.copy_tree({
                            context,
                            source: await svc_fs.node(new NodeUIDSelector(child_uuid)),
                            parent: await svc_fs.node(new NodeUIDSelector(uuid)),
                            target_name: child_name,
                        });
                    });
                }
            }

            // Add task to await entry
            tasks.add('fs:cp:entry-op', async () => {
                await entryOp.awaitDone();
                svc_resource.free(uuid);
                const copy_fsNode = await svc_fs.node(new NodeUIDSelector(uuid));
                copy_fsNode.entry = raw_fsentry;
                copy_fsNode.found = true;
                copy_fsNode.path = raw_fsentry.path;

                node = copy_fsNode;

                svc_event.emit('fs.create.file', {
                    node,
                    context,
                });
            }, { force: true });

            await tasks.awaitAll();
        });

        node = node || await svc_fs.node(new NodeUIDSelector(uuid));

        // TODO: What event do we emit? How do we know if we're overwriting?
        return node;
    }

    async move ({ context, node, new_parent, new_name, metadata }) {
        const old_path = await node.get('path');
        const new_path = path_.join(await new_parent.get('path'), new_name);

        const op_update = await this.fsEntryController.update(node.uid, {
            ...(
                await node.get('parent_uid') !== await new_parent.get('uid')
                    ? { parent_uid: await new_parent.get('uid') }
                    : {}
            ),
            path: new_path,
            name: new_name,
            ...(metadata ? { metadata } : {}),
        });

        node.entry.name = new_name;
        node.entry.path = new_path;

        // NOTE: this is a safeguard passed to update_child_paths to isolate
        //       changes to the owner's directory tree, ut this may need to be
        //       removed in the future.
        const user_id = await node.get('user_id');

        await op_update.awaitDone();

        await svc_fs.update_child_paths(old_path, node.entry.path, user_id);

        const promises = [];
        promises.push(svc_event.emit('fs.move.file', {
            context,
            moved: node,
            old_path,
        }));
        promises.push(svc_event.emit('fs.rename', {
            uid: await node.get('uid'),
            new_name,
        }));

        return node;
    }

    async readdir ({ node }) {
        const uuid = await node.get('uid');
        const child_uuids = await this.fsEntryController.fast_get_direct_descendants(uuid);
        return child_uuids;
    }

    async directory_has_name ({ parent, name }) {
        const uid = await parent.get('uid');
        /* eslint-disable */
        let check_dupe = await db.read(
            'SELECT `id` FROM `fsentries` WHERE `parent_uid` = ? AND name = ? LIMIT 1',
            [uid, name],
        );
        /* eslint-enable */
        return !!check_dupe[0];
    }

    /**
     * Write a new file to the filesystem. Throws an error if the destination
     * already exists.
     *
     * @param {Object} param
     * @param {Context} param.context
     * @param {FSNode} param.parent: The parent directory of the file.
     * @param {string} param.name: The name of the file.
     * @param {File} param.file: The file to write.
     * @returns {Promise<FSNode>}
     */
    async write_new ({ context, parent, name, file }) {
        console.log('calling write new');
        const {
            tmp, fsentry_tmp, message, actor: inputActor, app_id,
        } = context.values;
        const actor = inputActor ?? Context.get('actor');

        const uid = uuidv4();

        // determine bucket region
        let bucket_region = global_config.s3_region ?? global_config.region;
        let bucket = global_config.s3_bucket;

        if ( ! await svc_acl.check(actor, parent, 'write') ) {
            throw await svc_acl.get_safe_acl_error(actor, parent, 'write');
        }

        const storage_resp = await this.#storage_upload({
            uuid: uid,
            bucket,
            bucket_region,
            file,
            tmp: {
                ...tmp,
                path: path_.join(await parent.get('path'), name),
            },
        });

        fsentry_tmp.thumbnail = await fsentry_tmp.thumbnail_promise;
        delete fsentry_tmp.thumbnail_promise;

        const timestamp = Math.round(Date.now() / 1000);
        const raw_fsentry = {
            uuid: uid,
            is_dir: 0,
            user_id: actor.type.user.id,
            created: timestamp,
            accessed: timestamp,
            modified: timestamp,
            parent_uid: await parent.get('uid'),
            name,
            size: file.size,
            path: path_.join(await parent.get('path'), name),
            ...fsentry_tmp,
            bucket_region,
            bucket,
            associated_app_id: app_id ?? null,
        };

        svc_event.emit('fs.pending.file', {
            fsentry: FSNodeContext.sanitize_pending_entry_info(raw_fsentry),
            context,
        });

        svc_resource.register({
            uid,
            status: RESOURCE_STATUS_PENDING_CREATE,
        });

        const filesize = file.size;
        svc_size.change_usage(actor.type.user.id, filesize);

        // Meter ingress
        const ownerId = await parent.get('user_id');
        const ownerActor =  new Actor({
            type: new UserActorType({
                user: await get_user({ id: ownerId }),
            }),
        });

        svc_metering.incrementUsage(ownerActor, 'filesystem:ingress:bytes', filesize);

        const entryOp = await this.fsEntryController.insert(raw_fsentry);

        (async () => {
            await entryOp.awaitDone();
            svc_resource.free(uid);

            const new_item_node = await svc_fs.node(new NodeUIDSelector(uid));
            const new_item = await new_item_node.get('entry');
            const store_version_id = storage_resp.VersionId;
            if ( store_version_id ) {
                // insert version into db
                db.write('INSERT INTO `fsentry_versions` (`user_id`, `fsentry_id`, `fsentry_uuid`, `version_id`, `message`, `ts_epoch`) VALUES (?, ?, ?, ?, ?, ?)',
                                [
                                    actor.type.user.id,
                                    new_item.id,
                                    new_item.uuid,
                                    store_version_id,
                                    message ?? null,
                                    timestamp,
                                ]);
            }
        })();

        const node = await svc_fs.node(new NodeUIDSelector(uid));

        svc_event.emit('fs.create.file', {
            node,
            context,
        });

        return node;
    }

    /**
     * Overwrite an existing file. Throws an error if the destination does not
     * exist.
     *
     * @param {Object} param
     * @param {Context} param.context
     * @param {FSNodeContext} param.node: The node to write to.
     * @param {File} param.file: The file to write.
     * @returns {Promise<FSNodeContext>}
     */
    async write_overwrite ({ context, node, file }) {
        const {
            tmp, fsentry_tmp, message, actor: inputActor,
        } = context.values;
        const actor = inputActor ?? Context.get('actor');

        if ( ! await svc_acl.check(actor, node, 'write') ) {
            throw await svc_acl.get_safe_acl_error(actor, node, 'write');
        }

        const uid = await node.get('uid');

        const bucket_region = node.entry.bucket_region;
        const bucket = node.entry.bucket;

        const state_upload = await this.#storage_upload({
            uuid: node.entry.uuid,
            bucket,
            bucket_region,
            file,
            tmp: {
                ...tmp,
                path: await node.get('path'),
            },
        });

        if ( fsentry_tmp?.thumbnail_promise ) {
            fsentry_tmp.thumbnail = await fsentry_tmp.thumbnail_promise;
            delete fsentry_tmp.thumbnail_promise;
        }

        const ts = Math.round(Date.now() / 1000);
        const raw_fsentry_delta = {
            modified: ts,
            accessed: ts,
            size: file.size,
            ...fsentry_tmp,
        };

        svc_resource.register({
            uid,
            status: RESOURCE_STATUS_PENDING_CREATE,
        });

        const filesize = file.size;
        svc_size.change_usage(actor.type.user.id, filesize);

        // Meter ingress
        const ownerId = await node.get('user_id');
        const ownerActor =  new Actor({
            type: new UserActorType({
                user: await get_user({ id: ownerId }),
            }),
        });
        svc_metering.incrementUsage(ownerActor, 'filesystem:ingress:bytes', filesize);

        const entryOp = await this.fsEntryController.update(uid, raw_fsentry_delta);

        // depends on fsentry, does not depend on S3
        const entryOpPromise = (async () => {
            await entryOp.awaitDone();
            svc_resource.free(uid);
        })();

        const cachePromise = (async () => {
            await svc_fileCache.invalidate(node);
        })();

        (async () => {
            await Promise.all([entryOpPromise, cachePromise]);
            svc_event.emit('fs.write.file', {
                node,
                context,
            });
        })();

        // TODO (xiaochen): determine if this can be removed, post_insert handler need
        // to skip events from other servers (why? 1. current write logic is inside
        // the local server 2. broadcast system conduct "fire-and-forget" behavior)
        state_upload.post_insert({
            db, user: actor.type.user, node, uid, message, ts,
        });

        await cachePromise;

        return node;
    }

    async get_recursive_size ({ node }) {
        const uuid = await node.get('uid');
        const cte_query = `
            WITH RECURSIVE descendant_cte AS (
                SELECT uuid, parent_uid, size
                FROM fsentries
                WHERE parent_uid = ?

                UNION ALL

                SELECT f.uuid, f.parent_uid, f.size
                FROM fsentries f
                INNER JOIN descendant_cte d
                ON f.parent_uid = d.uuid
            )
            SELECT SUM(size) AS total_size FROM descendant_cte
        `;
        const rows = await db.read(cte_query, [uuid]);
        return rows[0].total_size;
    }

    // #endregion

    // #region internal

    /**
    * @param {Object} param
    * @param {File} param.file: The file to write.
    * @returns
    */
    async #storage_upload ({
        uuid,
        bucket,
        bucket_region,
        file,
        tmp,
    }) {
        const storage = svc_mountpoint.get_storage(this.constructor.name);

        bucket ??= global_config.s3_bucket;
        bucket_region ??= global_config.s3_region ?? global_config.region;

        let upload_tracker = new UploadProgressTracker();

        svc_event.emit('fs.storage.upload-progress', {
            upload_tracker,
            context: Context.get(),
            meta: {
                item_uid: uuid,
                item_path: tmp.path,
            },
        });

        if ( ! file.buffer ) {
            let stream = file.stream;
            let alarm_timeout = null;
            stream = stuck_detector_stream(stream, {
                timeout: STUCK_STATUS_TIMEOUT,
                on_stuck: () => {
                    console.warn('Upload stream stuck might be stuck', {
                        bucket_region,
                        bucket,
                        uuid,
                    });
                    alarm_timeout = setTimeout(() => {
                        extension.errors.report('fs.write.s3-upload', {
                            message: 'Upload stream stuck for too long',
                            alarm: true,
                            extra: {
                                bucket_region,
                                bucket,
                                uuid,
                            },
                        });
                    }, STUCK_ALARM_TIMEOUT);
                },
                on_unstuck: () => {
                    clearTimeout(alarm_timeout);
                },
            });
            file = { ...file, stream };
        }

        let hashPromise;
        if ( file.buffer ) {
            const hash = crypto.createHash('sha256');
            hash.update(file.buffer);
            hashPromise = Promise.resolve(hash.digest('hex'));
        } else {
            const hs = hashing_stream(file.stream);
            file.stream = hs.stream;
            hashPromise = hs.hashPromise;
        }

        hashPromise.then(hash => {
            svc_event.emit('outer.fs.write-hash', {
                hash, uuid,
            });
        });

        const state_upload = storage.create_upload();

        try {
            await this.storageController.upload({
                uid: uuid,
                file,
                storage_meta: { bucket, bucket_region },
                storage_api: { progress_tracker: upload_tracker },
            });
        } catch (e) {
            extension.errors.report('fs.write.storage-upload', {
                source: e || new Error('unknown'),
                trace: true,
                alarm: true,
                extra: {
                    bucket_region,
                    bucket,
                    uuid,
                },
            });
            throw APIError.create('upload_failed');
        }

        return state_upload;
    }

    async #rmnode ({ node, options }) {
        // Services
        if ( !options.override_immutable && await node.get('immutable') ) {
            throw new APIError(403, 'File is immutable.');
        }

        const userId = await node.get('user_id');
        const fileSize = await node.get('size');
        svc_size.change_usage(userId,
                        -1 * fileSize);

        const ownerActor =  new Actor({
            type: new UserActorType({
                user: await get_user({ id: userId }),
            }),
        });

        svc_metering.incrementUsage(ownerActor, 'filesystem:delete:bytes', fileSize);

        const tracer = svc_trace.tracer;
        const tasks = new ParallelTasks({ tracer, max: 4 });

        tasks.add('remove-fsentry', async () => {
            await this.fsEntryController.delete(await node.get('uid'));
        });

        if ( await node.get('has-s3') ) {
            tasks.add('remove-from-s3', async () => {
                // const storage = new PuterS3StorageStrategy({ services: svc });
                const storage = Context.get('storage');
                const state_delete = storage.create_delete();
                await state_delete.run({
                    node: node,
                });
            });
        }

        await tasks.awaitAll();
    }
    // #endregion
}
